共计 8009 个字符,预计需要花费 21 分钟才能阅读完成。
前言
博主最近在搞 Dubbo 分布式业务,相信来看此篇文章的开发朋友们对分布式这个名词肯定不陌生,在分布式业务中肯定就会牵涉到分布式事务,对于分布式事务博主开始听了这个词就觉得很难,但是其实还好,就是在整合 Dubbo 与 Seata 的其中踩了一些坑,并没有如同官方那么一帆风顺,那么本次就将整合步骤以及一些坑给大家爆出来,以防大家重蹈覆辙~
整合步骤
前提说明
我的业务框架是 Dubbo
+Mybatis-Plus
+Zookeeper
+Nacos
+Seata
,至于为什么要同时使用Zookeeper
+Nacos
呢,因为前期没有整合分布式事务的时候用的 zk 做的服务注册中心,后面可能进行移除,换为全局 Nacos
作为注册中心
安装 Nacos
关于 Zookeeper
我就不多于说明了,因为本文主要是讲述 Dubbo
与Seata
的集成方面的业务。
Nacos
我是用的 Docker
安装的,相关命令如下:
# 拉取 nacos 镜像
docker pull nacos/nacos-server
# 启动镜像
docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server
# 默认账户密码是:nacos/nacos
启动好 Nacos
之后直接访问 http://{ip}:8848/nacos/index.html
即可登录:
下载 / 配置 / 启动 Seata
进入到 https://github.com/seata/seata/releases 下载 seata 的发行版,我这里使用的 0.9.0 版本。
下载完成之后进行解压,其中 bin
目录下存放为启动脚本,conf
目录下存放为配置文件以及相关 SQL 和配置注入脚本,lib
目录下是 seata 的相关依赖。
打开文件我们可以看见可以使用 file、nacos、apollo、zk、consul 的注册和配置中心,本次主要是搭配 nacos 进行食用~
registry {
type = "nacos"
nacos {
serverAddr = "127.0.0.1" #nacos 地址 ip
namespace = "public" #nacos 的命名空间,默认为 public
cluster = "default" #集群,由于没有所以填写 default
}
file {name = "file.conf"}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1" #nacos 地址 ip
cluster = "default" #集群,由于没有所以填写 default
}
file {name = "file.conf"}
}
注意:在 registry 中 config 没有 namespace 属性,否则会出现服务启动失败或 no available!
接着我们修改 file.conf,其配置主要为:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroup_mapping.service-user-provider-group = "default"
vgroup_mapping.service-order-provider-group = "default"
vgroup_mapping.service-storage-provider-group = "default"
#这里是你的事务分组配置,格式为 vgroup_mapping.${YOUR_SERVICE_NAME}-group,当然 ${YOUR_SERVICE_NAME}-group
部分你可以自定
#下面是你的 seata 的服务列表
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
disableGlobalTransaction = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
report.retry.count = 5
tm.commit.retry.count = 1
tm.rollback.retry.count = 1
}
## transaction log store, only used in seata-server
store {
## store mode: file、db
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
}
## database store property
db {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
## 此处为你的数据库配置
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "root"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
support {
## spring
spring {
# auto proxy the DataSource bean
datasource.autoproxy = false
}
}
配置好上述配置文件之后,我们将 conf 目录下的 db_store.sql
文件导入到我们的数据库,我这里的数据库名为seata
(上述配置文件可以看出)
接着我们再修改目录下的 nacos-config.txt
,这个文件其实就是将file.conf
翻译成 properties 格式的,这里我就不做过多的说明了,写好之后我们将配置写入到 nacos 中:
# 在 conf 目录下执行
sh nacos-config.sh {Nacos-Server-IP} #将 {Nacos-Server-IP} 换成你的 IP
写入成功之后,你会看到这样一行小绿字:
init nacos config finished, please start seata-server.
启动 seata-server
# 在 bin 目录下执行
sh seata-server.sh
# or
sh seata-server.sh -h 127.0.0.1 -p 8091 -m db
# 下面的是带参启动可以覆盖配置文件里面的数据
启动成功之后,你会看到 Nacos 的「控制台」-「服务列表」中会新增一项服务名为 serverAddr
的服务,如图:
业务整合
业务架构分为
service-order-provider # 订单服务
service-storage-provider # 库存服务
service-user-provider # 用户服务
service-user-consumer # 用户业务调用
导入日志数据表
将 seata 的 conf 目录下的 db_undo_log.sql
到你的业务数据库
业务配置
我们要在三个 provider
服务中写入如下配置:
file.conf 与 seata 的 conf 目录下一致
registry.conf 与 seata 的 conf 目录下一致
引入需要的依赖包
com.alibaba.nacos
nacos-client
1.1.4
io.seata
seata-all
0.9.0
进行 Seata 的配置,包括数据库资源 / 数据库代理设置 /SqlSessionFactory 等
/**
* @author Licoy.cn
* @version 2019/12/23
*/
@Configuration
public class SeataAutoConfig {@Value("${spring.application.name}")
private String appName;
@Autowired
private DataSourceProperties dataSourceProperties;
/**
* init durid datasource
*
* @Return: druidDataSource datasource instance
*/
@Bean
@Primary
public DruidDataSource druidDataSource(){DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(dataSourceProperties.getUrl());
druidDataSource.setUsername(dataSourceProperties.getUsername());
druidDataSource.setPassword(dataSourceProperties.getPassword());
druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
druidDataSource.setInitialSize(0);
druidDataSource.setMaxActive(180);
druidDataSource.setMaxWait(60000);
druidDataSource.setMinIdle(0);
druidDataSource.setValidationQuery("Select 1 from DUAL");
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
druidDataSource.setMinEvictableIdleTimeMillis(25200000);
druidDataSource.setRemoveAbandoned(true);
druidDataSource.setRemoveAbandonedTimeout(1800);
druidDataSource.setLogAbandoned(true);
try {Driver driver = new Driver();
druidDataSource.setDriver(driver);
} catch (SQLException e) {e.printStackTrace();
}
return druidDataSource;
}
@Bean
public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){return new DataSourceProxy(druidDataSource);
}
// 因为我使用的是 MybatisPlus,所以需要注入此部分
@Bean
public MybatisSqlSessionFactoryBean mybatisSqlSessionFactoryBean(DataSourceProxy proxy) throws IOException {MybatisSqlSessionFactoryBean mybatisPlus = new MybatisSqlSessionFactoryBean();
mybatisPlus.setDataSource(proxy);
mybatisPlus.setVfs(SpringBootVFS.class);
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
mybatisPlus.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
GlobalConfig globalConfig = new GlobalConfig();
GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
// 使用 ID_WORKER_STR,因为前后端分离使用整形,前端 JS 会有精度丢失
dbConfig.setIdType(IdType.ID_WORKER_STR);
globalConfig.setDbConfig(dbConfig);
mybatisPlus.setGlobalConfig(globalConfig);
MybatisConfiguration mc = new MybatisConfiguration();
// 对于完全自定义的 mapper 需要加此项配置,才能实现下划线转驼峰
mc.setMapUnderscoreToCamelCase(true);
mc.setDefaultScriptingLanguage(MybatisXMLLanguageDriver.class);
mybatisPlus.setConfiguration(mc);
return mybatisPlus;
}
@Bean
public GlobalTransactionScanner globalTransactionScanner(){return new GlobalTransactionScanner(this.appName, String.format("%s-group", this.appName));
}
}
业务接口
order 服务下有创建订单的接口
/**
* 创建订单
* @param order 订单
*/
ClientOrder create(ClientOrder order);
storage 服务下有减少库存的接口
/**
* 扣除库存
* @param productId 产品 ID
* @param total 扣除数量
*/
void decrease(String productId, Integer total);
user 服务下有减少账户余额以及购买的接口
/**
* 扣除账户余额
* @param userId 用户 ID
* @param money 扣除金额
*/
void decreaseMoney(String userId, BigDecimal money);
/**
* 购买产品
* @param productId 产品 ID
* @param uid 用户 ID
* @param totalCount 购买数量
*/
void buy(String productId, String uid, Integer totalCount);
创建订单 / 扣除库存 / 扣除账户余额这三个接口我就不在此展示了,因为都是基本的 CURD+ 业务判断,主要展示一下购买产品的业务接口实现,因为我们需要对此业务的过程中处理分布式事务:
@Override
@GlobalTransactional(name = "service-user-provider")
public void buy(String productId, String uid, Integer totalCount) {log.info("开始全局事务"+ RootContext.getXID());
ClientOrder order = new ClientOrder();
BigDecimal money = new BigDecimal(200);
order.setMoney(money);
order.setPid(productId);
order.setUid(uid);
order.setTotal(totalCount);
log.info("==== 创建订单 ====");
ClientOrder order1 = this.orderService.create(order);
log.info("==== 创建订单完成 ====");
log.info("==== 扣除库存 ====");
this.storageService.decrease(productId, totalCount);
log.info("==== 库存扣除完成 ====");
log.info("==== 扣除账户余额 ====");
this.decreaseMoney(uid, money);
log.info("==== 账户余额扣除完成 ====");
log.info("==== 购买成功 ====");
}
由上述代码可以看出,我们只需要添加一个 @GlobalTransactional 注解就可以进行分布式事务控制,其中 name 为该项目 spring.application.name
的值。
对于事务回滚,我们只需要将用户的余额设置为 0,这个时候扣除余额就会失败,那么业务失败,就会进行事务回滚,当操作完成之后我们看到数据库的订单和库存并没有创建和减少,就代表我们的分布式事务 Seata 配置完成并可以成功使用。
后记
在配置 Seata 的时候确实踩了不少坑,现在回头过来有些都已经忘却(当时只顾得解决 BUG,没有记录下来),所以此篇文章关于坑的展示并没有自己想的那么多,如果大家遇到了这方面的问题,可以在文章下方评论,博主将会尽可能的帮助你解决你的燃眉之急!